package cn.doitedu.udfs;

import cn.doitedu.pojo.LogBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class JsonToBeanFunction extends ProcessFunction<String, LogBean> {


    @Override
    public void processElement(String line, Context ctx, Collector<LogBean> out) throws Exception {

        try {
            LogBean logBean = JSON.parseObject(line, LogBean.class);
            if(logBean.getDeviceId() != null && !"".equals(logBean.getDeviceId())) {
                out.collect(logBean);
            }
        } catch (Exception e) {
            //将有问题的数据记录到log文件中或写入到hbase、kafka
        }
    }
}
